{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(remote-execution)=\n",
    "# Remote execution\n",
    "\n",
    "You can chain functions together with remote execution. This allows you to:\n",
    "- Call existing functions from the graph and reuse them from other graphs.\n",
    "- Scale up and down different components individually.\n",
    "\n",
    "Calling a remote function can either use HTTP or via a queue (streaming).\n",
    "\n",
    "**In this section**\n",
    "- [HTTP](#http)\n",
    "- [Queue (streaming)](#queue-streaming)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## HTTP\n",
    "\n",
    "Calling a function using http uses the special `$remote` class. First deploy the remote function:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-03-17 08:20:40,674 [info] Starting remote function deploy\n",
      "2022-03-17 08:20:40  (info) Deploying function\n",
      "2022-03-17 08:20:40  (info) Building\n",
      "2022-03-17 08:20:40  (info) Staging files and preparing base images\n",
      "2022-03-17 08:20:40  (info) Building processor image\n",
      "2022-03-17 08:20:42  (info) Build complete\n",
      "2022-03-17 08:20:47  (info) Function deploy complete\n",
      "> 2022-03-17 08:20:48,289 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-graph-basic-concepts-serving-example-flow.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['graph-basic-concepts-serving-example-flow-graph-basic-concepts.default-tenant.app.maor-gcp2.iguazio-cd0.com/']}\n"
     ]
    }
   ],
   "source": [
    "remote_func_name = \"serving-example-flow\"\n",
    "fn_remote = project.set_function(\n",
    "    name=remote_func_name, kind=\"serving\", image=\"mlrun/mlrun\"\n",
    ")\n",
    "\n",
    "fn_remote.add_model(\n",
    "    \"model1\",\n",
    "    class_name=\"ClassifierModel\",\n",
    "    model_path=\"https://s3.wasabisys.com/iguazio/models/iris/model.pkl\",\n",
    ")\n",
    "\n",
    "remote_addr = fn_remote.deploy()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a new function with a graph and call the remote function above:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"370pt\" height=\"44pt\"\n",
       " viewBox=\"0.00 0.00 370.28 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-40 366.2796,-40 366.2796,4 -4,4\"/>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-.0493 40.698,-.1479 42.8263,-.2953 44.9236,-.4913 46.9815,-.7353 48.9917,-1.0266 50.9463,-1.3645 52.8377,-1.7479 54.6587,-2.1759 56.4025,-2.6472 58.0628,-3.1606 59.634,-3.7147 61.1107,-4.308 62.4882,-4.9388 63.7625,-5.6054 64.9302,-6.3059 65.9882,-7.0385 66.9343,-7.8012 67.7669,-8.5918 68.4849,-9.4082 69.0878,-10.2481 69.5758,-11.1093 69.9496,-11.9894 70.2102,-12.886 70.3595,-13.7965 70.3997,-14.7186 70.3334,-15.6497 70.1636,-16.5873 69.8937,-17.5287 69.5276,-18.4713 69.0691,-19.4127 68.5225,-20.3503 67.8923,-21.2814 67.1831,-22.2035 66.3996,-23.114 65.5464,-24.0106 64.6285,-24.8907 63.6504,-25.7519 62.617,-26.5918 61.5329,-27.4082 60.4024,-28.1988 59.2299,-28.9615 58.0197,-29.6941 56.7755,-30.3946 55.5012,-31.0612 54.2002,-31.692 52.8757,-32.2853 51.5309,-32.8394 50.1684,-33.3528 48.7908,-33.8241 47.4003,-34.2521 45.9989,-34.6355 44.5886,-34.9734 43.1708,-35.2647 41.7472,-35.5087 40.3189,-35.7047 38.8872,-35.8521 37.4531,-35.9507 36.0175,-36 34.5815,-36 33.146,-35.9507 31.7119,-35.8521 30.2801,-35.7047 28.8519,-35.5087 27.4282,-35.2647 26.0105,-34.9734 24.6001,-34.6355 23.1988,-34.2521 21.8083,-33.8241 20.4306,-33.3528 19.0681,-32.8394 17.7233,-32.2853 16.3989,-31.692 15.0979,-31.0612 13.8236,-30.3946 12.5794,-29.6941 11.3691,-28.9615 10.1967,-28.1988 9.0662,-27.4082 7.982,-26.5918 6.9486,-25.7519 5.9706,-24.8907 5.0526,-24.0106 4.1995,-23.114 3.4159,-22.2035 2.7067,-21.2814 2.0765,-20.3503 1.53,-19.4127 1.0715,-18.4713 .7053,-17.5287 .4355,-16.5873 .2657,-15.6497 .1993,-14.7186 .2395,-13.7965 .3888,-12.886 .6495,-11.9894 1.0232,-11.1093 1.5112,-10.2481 2.1141,-9.4082 2.8321,-8.5918 3.6647,-7.8012 4.6109,-7.0385 5.6689,-6.3059 6.8365,-5.6054 8.1108,-4.9388 9.4884,-4.308 10.9651,-3.7147 12.5362,-3.1606 14.1966,-2.6472 15.9404,-2.1759 17.7614,-1.7479 19.6528,-1.3645 21.6074,-1.0266 23.6176,-.7353 25.6755,-.4913 27.7728,-.2953 29.901,-.1479 32.0515,-.0493 34.2154,0 36.3837,0 38.5476,-.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- enrich -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>enrich</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"146.8955\" cy=\"-18\" rx=\"40.0939\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"146.8955\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;enrich -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;enrich</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.9975,-18C78.2752,-18 87.2796,-18 96.1148,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.3553,-21.5001 106.3553,-18 96.3552,-14.5001 96.3553,-21.5001\"/>\n",
       "</g>\n",
       "<!-- remote_func -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>remote_func</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"292.7357\" cy=\"-18\" rx=\"69.5877\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"292.7357\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">remote_func</text>\n",
       "</g>\n",
       "<!-- enrich&#45;&gt;remote_func -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>enrich&#45;&gt;remote_func</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M187.5201,-18C195.4559,-18 204.0312,-18 212.7424,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"213.0009,-21.5001 223.0008,-18 213.0008,-14.5001 213.0009,-21.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7f57dc96a0d0>"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn_preprocess = project.set_function(name=\"preprocess\", kind=\"serving\")\n",
    "graph_preprocessing = fn_preprocess.set_topology(\"flow\")\n",
    "\n",
    "graph_preprocessing.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})').to(\n",
    "    \"$remote\", \"remote_func\", url=f\"{remote_addr}v2/models/model1/infer\", method=\"put\"\n",
    ").respond()\n",
    "\n",
    "graph_preprocessing.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-03-17 08:20:48,374 [warning] run command, file or code were not specified\n",
      "{'id': '3a1dd36c-e7de-45af-a0c4-72e3163ba92a', 'model_name': 'model1', 'outputs': [0, 2]}\n"
     ]
    }
   ],
   "source": [
    "fn3_server = fn_preprocess.to_mock_server()\n",
    "my_data = \"\"\"{\"inputs\":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}\"\"\"\n",
    "result = fn3_server.test(\"/v2/models/my_model/infer\", body=my_data)\n",
    "fn3_server.wait_for_completion()\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Queue (streaming)\n",
    "\n",
    "You can use queues to send events from one part of the graph to another and to decouple the processing of those parts.\n",
    "Queues are better suited to deal with bursts of events, since all the events are stored in the queue until they are processed.\n",
    "\n",
    "queue or stream that accepts data from one or more source steps and publishes to one or more output steps. \n",
    "    Queues are best used to connect independent functions/containers. Queues can run in-memory or be implemented using a stream, which allows it to span processes/containers. Currently queues support Iguazio v3io and Kafka streams."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(serving-v3io-stream-example)=\n",
    "### V3IO stream example\n",
    "The example below uses a V3IO stream, which is a fast real-time implementation of a stream that allows processing of events at very low latency."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting echo.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile echo.py\n",
    "def echo_handler(x):\n",
    "    print(x)\n",
    "    return x"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the streams:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "streams_prefix = (\n",
    "    f\"v3io:///users/{os.getenv('V3IO_USERNAME')}/examples/graph-basic-concepts\"\n",
    ")\n",
    "\n",
    "input_stream = streams_prefix + \"/in-stream\"\n",
    "out_stream = streams_prefix + \"/out-stream\"\n",
    "err_stream = streams_prefix + \"/err-stream\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the graph. In the `to` method the class name is one of `>>` or `$queue` to specify that this is a queue. To configure a consumer group for the step, include the group in the `to` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"602pt\" height=\"44pt\"\n",
       " viewBox=\"0.00 0.00 602.19 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-40 598.1861,-40 598.1861,4 -4,4\"/>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-.0493 40.698,-.1479 42.8263,-.2953 44.9236,-.4913 46.9815,-.7353 48.9917,-1.0266 50.9463,-1.3645 52.8377,-1.7479 54.6587,-2.1759 56.4025,-2.6472 58.0628,-3.1606 59.634,-3.7147 61.1107,-4.308 62.4882,-4.9388 63.7625,-5.6054 64.9302,-6.3059 65.9882,-7.0385 66.9343,-7.8012 67.7669,-8.5918 68.4849,-9.4082 69.0878,-10.2481 69.5758,-11.1093 69.9496,-11.9894 70.2102,-12.886 70.3595,-13.7965 70.3997,-14.7186 70.3334,-15.6497 70.1636,-16.5873 69.8937,-17.5287 69.5276,-18.4713 69.0691,-19.4127 68.5225,-20.3503 67.8923,-21.2814 67.1831,-22.2035 66.3996,-23.114 65.5464,-24.0106 64.6285,-24.8907 63.6504,-25.7519 62.617,-26.5918 61.5329,-27.4082 60.4024,-28.1988 59.2299,-28.9615 58.0197,-29.6941 56.7755,-30.3946 55.5012,-31.0612 54.2002,-31.692 52.8757,-32.2853 51.5309,-32.8394 50.1684,-33.3528 48.7908,-33.8241 47.4003,-34.2521 45.9989,-34.6355 44.5886,-34.9734 43.1708,-35.2647 41.7472,-35.5087 40.3189,-35.7047 38.8872,-35.8521 37.4531,-35.9507 36.0175,-36 34.5815,-36 33.146,-35.9507 31.7119,-35.8521 30.2801,-35.7047 28.8519,-35.5087 27.4282,-35.2647 26.0105,-34.9734 24.6001,-34.6355 23.1988,-34.2521 21.8083,-33.8241 20.4306,-33.3528 19.0681,-32.8394 17.7233,-32.2853 16.3989,-31.692 15.0979,-31.0612 13.8236,-30.3946 12.5794,-29.6941 11.3691,-28.9615 10.1967,-28.1988 9.0662,-27.4082 7.982,-26.5918 6.9486,-25.7519 5.9706,-24.8907 5.0526,-24.0106 4.1995,-23.114 3.4159,-22.2035 2.7067,-21.2814 2.0765,-20.3503 1.53,-19.4127 1.0715,-18.4713 .7053,-17.5287 .4355,-16.5873 .2657,-15.6497 .1993,-14.7186 .2395,-13.7965 .3888,-12.886 .6495,-11.9894 1.0232,-11.1093 1.5112,-10.2481 2.1141,-9.4082 2.8321,-8.5918 3.6647,-7.8012 4.6109,-7.0385 5.6689,-6.3059 6.8365,-5.6054 8.1108,-4.9388 9.4884,-4.308 10.9651,-3.7147 12.5362,-3.1606 14.1966,-2.6472 15.9404,-2.1759 17.7614,-1.7479 19.6528,-1.3645 21.6074,-1.0266 23.6176,-.7353 25.6755,-.4913 27.7728,-.2953 29.901,-.1479 32.0515,-.0493 34.2154,0 36.3837,0 38.5476,-.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- enrich -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>enrich</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"146.8955\" cy=\"-18\" rx=\"40.0939\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"146.8955\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;enrich -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;enrich</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.9975,-18C78.2752,-18 87.2796,-18 96.1148,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.3553,-21.5001 106.3553,-18 96.3552,-14.5001 96.3553,-21.5001\"/>\n",
       "</g>\n",
       "<!-- input_stream -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>input_stream</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"323.1919,-30 223.1919,-30 223.1919,-6 323.1919,-6 335.1919,-18 323.1919,-30\"/>\n",
       "<text text-anchor=\"middle\" x=\"279.1919\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">input_stream</text>\n",
       "</g>\n",
       "<!-- enrich&#45;&gt;input_stream -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>enrich&#45;&gt;input_stream</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M187.3076,-18C195.3909,-18 204.0816,-18 212.7784,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"212.9545,-21.5001 222.9544,-18 212.9544,-14.5001 212.9545,-21.5001\"/>\n",
       "</g>\n",
       "<!-- echo -->\n",
       "<g id=\"node4\" class=\"node\">\n",
       "<title>echo</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"403.689\" cy=\"-18\" rx=\"32.4942\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"403.689\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">echo</text>\n",
       "</g>\n",
       "<!-- input_stream&#45;&gt;echo -->\n",
       "<g id=\"edge3\" class=\"edge\">\n",
       "<title>input_stream&#45;&gt;echo</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M335.249,-18C343.8712,-18 352.6535,-18 360.9252,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"361.0793,-21.5001 371.0793,-18 361.0792,-14.5001 361.0793,-21.5001\"/>\n",
       "</g>\n",
       "<!-- output_stream -->\n",
       "<g id=\"node5\" class=\"node\">\n",
       "<title>output_stream</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"582.1861,-30 472.1861,-30 472.1861,-6 582.1861,-6 594.1861,-18 582.1861,-30\"/>\n",
       "<text text-anchor=\"middle\" x=\"533.1861\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">output_stream</text>\n",
       "</g>\n",
       "<!-- echo&#45;&gt;output_stream -->\n",
       "<g id=\"edge4\" class=\"edge\">\n",
       "<title>echo&#45;&gt;output_stream</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M436.3666,-18C444.2602,-18 453.0047,-18 461.9156,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"462.0105,-21.5001 472.0105,-18 462.0105,-14.5001 462.0105,-21.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7f57c7907990>"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn_preprocess2 = project.set_function(\"preprocess\", kind=\"serving\")\n",
    "fn_preprocess2.add_child_function(\"echo_func\", \"./echo.py\", \"mlrun/mlrun\")\n",
    "\n",
    "graph_preprocess2 = fn_preprocess2.set_topology(\"flow\")\n",
    "\n",
    "graph_preprocess2.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})').to(\n",
    "    \">>\", \"input_stream\", path=input_stream, group=\"mygroup\"\n",
    ").to(name=\"echo\", handler=\"echo_handler\", function=\"echo_func\").to(\n",
    "    \">>\", \"output_stream\", path=out_stream, sharding_func=\"partition\"\n",
    ")\n",
    "\n",
    "graph_preprocess2.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-03-17 08:20:55,182 [warning] run command, file or code were not specified\n",
      "{'id': 'a6efe8217b024ec7a7e02cf0b7850b91'}\n",
      "{'inputs': [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], 'tag': 'something'}\n"
     ]
    }
   ],
   "source": [
    "from echo import *\n",
    "\n",
    "fn4_server = fn_preprocess2.to_mock_server(current_function=\"*\")\n",
    "\n",
    "my_data = \"\"\"{\"inputs\": [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], \"partition\": 0}\"\"\"\n",
    "\n",
    "result = fn4_server.test(\"/v2/models/my_model/infer\", body=my_data)\n",
    "fn4_server.wait_for_completion()\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(serving-kafka-stream-example)=\n",
    "### Kafka stream example\n",
    "You can also use Kafka to configure the streams."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting echo.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile echo.py\n",
    "def echo_handler(x):\n",
    "    print(x)\n",
    "    return x"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the streams"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "kafka_prefix = f\"kafka://{broker}/\"\n",
    "internal_topic = kafka_prefix + \"in-topic\"\n",
    "out_topic = kafka_prefix + \"out-topic\"\n",
    "err_topic = kafka_prefix + \"err-topic\"\n",
    "\n",
    "# replace this\n",
    "brokers = \"<broker IP>\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the graph. In the `to` method the class name is one of `>>` or `$queue` to specify that this is a queue. To configure a consumer group for the step, include the group in the `to` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import mlrun\n",
    "\n",
    "fn_preprocess2 = project.set_function(\"preprocess\", kind=\"serving\")\n",
    "fn_preprocess2.add_child_function(\"echo_func\", \"./echo.py\", \"mlrun/mlrun\")\n",
    "\n",
    "graph_preprocess2 = fn_preprocess2.set_topology(\"flow\")\n",
    "\n",
    "graph_preprocess2.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})').to(\n",
    "    \">>\",\n",
    "    \"input_stream\",\n",
    "    path=input_topic,\n",
    "    group=\"mygroup\",\n",
    "    kafka_brokers=brokers,\n",
    ").to(name=\"echo\", handler=\"echo_handler\", function=\"echo_func\").to(\n",
    "    \">>\", \"output_stream\", path=out_topic, kafka_brokers=brokers\n",
    ")\n",
    "\n",
    "graph_preprocess2.plot(rankdir=\"LR\")\n",
    "\n",
    "from echo import *\n",
    "\n",
    "fn4_server = fn_preprocess2.to_mock_server(current_function=\"*\")\n",
    "\n",
    "fn4_server.set_error_stream(f\"kafka://{brokers}/{err_topic}\")\n",
    "\n",
    "my_data = \"\"\"{\"inputs\":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}\"\"\"\n",
    "\n",
    "result = fn4_server.test(\"/v2/models/my_model/infer\", body=my_data)\n",
    "fn4_server.wait_for_completion()\n",
    "\n",
    "print(result)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
